package org.example.dobs.demo.flink.wc.source.custom;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

/**
 * 支持并行度的流
 */
public class MySourceFunction_V2 implements ParallelSourceFunction<Long> {
    private boolean isRunning = true;
    private long number = 1L;

    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(number++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }


    public static void main(String[] args) throws Exception {

        /*
         * 多并行度。
         * 如果自定义的流是读一个文件，当时并行库设为2时，文件会被读2遍。
         */
        //step1 environment of flink
        StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //step2 input
        DataStreamSource<Long> numberStream = see.addSource(new MySourceFunction_V2()).setParallelism(2);
        //step3 transform
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {

            @Override
            public Long map(Long aLong) throws Exception {
                System.out.println("输出元素： " + aLong);
                return aLong;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong % 2 == 0;
            }
        });
        // step4 output
        filterDataStream.print().setParallelism(1);
        // finally run or execute flink job
        see.execute("StreamingDemoWithCustomSource");
    }
}
